package net.wicp.tams.duckula.plugin.redis;

import java.io.File;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import com.alibaba.fastjson.JSONObject;

import lombok.extern.slf4j.Slf4j;
import net.wicp.tams.common.others.RedisClient;
import net.wicp.tams.commons.Conf;
import net.wicp.tams.commons.apiext.IOUtil;
import net.wicp.tams.commons.constant.OptType;
import net.wicp.tams.duckula.plugin.ReceiveAbs;
import net.wicp.tams.duckula.plugin.Utils;
import net.wicp.tams.duckula.plugin.bean.DuckulaPackage;
import net.wicp.tams.duckula.plugin.bean.Rule;
import net.wicp.tams.duckula.plugin.bean.SingleRecord;
import net.wicp.tams.duckula.plugin.constant.RuleItem;
import redis.clients.jedis.Jedis;

@Slf4j
public class ReceiveRedis extends ReceiveAbs {

	public ReceiveRedis(JSONObject paramObjs) {
		super(paramObjs);
		if (super.params == null || super.params.size() == 0) {
			log.error("需要redis相关参数");
			throw new RuntimeException("需要redis相关参数");
		}
		Properties props = IOUtil.fileToProperties(new File(
				IOUtil.mergeFolderAndFilePath(System.getenv("DUCKULA_HOME"), "/conf/duckula-plugin-redis.properties")));
		String host = String.valueOf(super.params.get("host"));
		String port = String.valueOf(super.params.get("port"));
		props.setProperty("common.others.redisserver.host", host);
		props.setProperty("common.others.redisserver.port", port);
		props.setProperty("common.others.redisserver.password", "Hammer_2017");
		Conf.overProp(props);
	}

	@Override
	public boolean receiveMsg(DuckulaPackage duckulaPackage, Rule rule) {
		String splitKey = rule.getSplitKey() == null ? duckulaPackage.getEventTable().getCols()[0] : rule.getSplitKey();
		Jedis jedis = RedisClient.getConnection();
		OptType optType = duckulaPackage.getEventTable().getOptType();
		for (int i = 0; i < duckulaPackage.getRowsNum(); i++) {
			Map<String, String> data = Utils.getUseData(duckulaPackage, i);
			String keyValue = String.format(rule.getItems().get(RuleItem.key), data.get(splitKey));
			if (optType == OptType.delete) {
				jedis.del(keyValue);
			} else {
				jedis.set(keyValue, JSONObject.toJSONString(data));
			}
		}
		RedisClient.returnResource(jedis);
		return true;
	}

	@Override
	public boolean receiveMsg(List<SingleRecord> data, Rule rule) {
		Jedis jedis = RedisClient.getConnection();
		for (SingleRecord singleRecord : data) {
			if (singleRecord.getOptType() == OptType.delete) {
				jedis.del(singleRecord.getKey());
			} else {
				try {
					jedis.set(singleRecord.getKey().getBytes("UTF-8"), singleRecord.getData());
				} catch (UnsupportedEncodingException e) {
				}
			}
		}
		RedisClient.returnResource(jedis);
		return true;
	}
}
